F# WebSocket Server

Аннотация

Идея писать веб-сервера и веб-фреймворки на всех языках у меня возникла с тех пор, когда я понял, что то, что я сделал для экосистемы Erlang: направление фреймворков для предприятий под общим брендом N2O, а сейчас как часть платформы erp.uno; вполне применимо и для других языков и платформ. В этой статье представлена версия вебсокет-сервера для языка программирования F# — ws.erp.uno. Адрес пакета: nuget.org/packages/ws. Адрес репозитория: erpuno/ws.

Предисловие

Haskell. Первый эксперимент был совершен Андреем Мельниковым в виде порта для Хаскеля: N2O.HS, позже более полную версию с N2O и NITRO с экзестенциальными сигнатурами сделал Марат Хафизов, который управляет Github организацией O3 и сайтом o3.click. Мне совершенно непонятно, почему ни один хаскель программист, которые вроде как должны восхищаться минимализмом, не идет по этому пути, а обычно ищет правды в таких фреймворках как UrWeb, IHP, UnisonWeb. На мой взгляд — всё это переусложненные штуки.

Standard ML. Также в академических целях, Марат Хафизов сделал порт связки веб-сервера N2O и веб-фреймворка NITRO на язык Standard ML (обе главные версии SML/NJ и MLton) — эта работа представлена Github организацией O1. Это тот язык, который я считаю уместно преподавать как первый академический язык программирования (до знакомства с промышленными языками Erlang, F#, Haskell).

Lean. Для закрепления своей идеи и более четкой и точной ее артикуляции я попросил Siegmentation Fault сделать порт на еще более формальный язык программирования, математический прувер Lean 4. Эта версия связки веб-сервера N2O и веб-фреймворка NITRO представлена Github организацией O89 и сразу двумя сайтами: lean4.dev и bum.pm. Последний представляет собой пакетный менеджер написанный на Lean 4, который нам помогает администрировать Александр Темерев из CERN. Lean 4 N2O проекты залайкал Леонардо де Мура, автор Lean и Z3, чему мы безмерно рады.

Идиоматический веб-сервер на F#

Критерии идиоматичности могут каждым восприниматься по разному, но в основном это означаем минимум прелюдий и максимум сути, так или иначе основная мантра всех минималистов в общем и N2O инфраструктуры в частности. Так в современные критерии идиоматичности веб-сервера для языка F# я бы выделил следующее: 1) использование системных классов System.Net.WebSockets, которые уже предоставляют буферизированные енкодер и декодер фреймов стандарта RFC 6455; 2) сервер должен быть построен на Async компютейшинал экспрешинах; 3) для управления асинхронными потоками выполнения должен использоваться MailboxProcessor, а не самописная система воркеров, которая хоть и поможет выжать последнее из F# (у меня получилось 14 миллионов сообщений в секунду), но не продемонстрирует сути, так как будет девиацией в сторону рантаймов; 4) Использование классов TcpListener и TcpClient, NetworkStream. Больше ни чем не разрешается пользоваться!

Что почитать перед написанием?

Немного погуглив, я понял что интернету нехватает статьи, которая описывает историю понятия асинхронных вычислений и вычислительных выражений, которые в народе известны по ключевым словам async/await. Вижу статью, которая называется "Survey of Brief Async history", в которой будет показана ретроспектива Async технологии:

0) J operator 1965;
1) LISP call/cc 1968;
2) Erlang 1986;
3) Concurrent ML 1998;
4) Haskell async 2004;
5) C# async yield 2006;
6) Perl IO:Async 2007;
7) F# Async 2010
8) C#/PHP Async 2012
9) Python async 2015
10) ECMAScript async 2017

Основополагающей статьей по F# async я бы назвал F# Async Guide Лео Городинского, jet.com. Основной книгой, которую я бы порекомендовал полистать перед знакомством с F# — это "Expert F# 4.0" автора языка Дона Сайма. Основной презентацией по F# Async я бы назвал доклад Дона Сайма на митапе в Лондоне — Some F# for the Erlang programmer. Вооружившись этими документами и этим Gist сниппетом я ухал во Львов писать самый идиоматичный вебсокет-сервер.

Витрина

Как обычно принятно в бектрекинг системах, прологах и декларативных языках, будем двигаться с конца, а именно с интерфейса который мы хотим получить. Хочется, чтобы ЭХО-сервер представлял собой функцию id.

open N2O module Program = [<EntryPoint>] let main _ = let mutable ret = 0 try Stream.protocol <- fun x -> x use ws = Server.start "0.0.0.0" 1900 System.Threading.Thread.Sleep -1 with exn -> printfn "EXIT: %s" exn.Message ret <- 1 ret

Архитектура асинхронных процессов

Для тех, кто знаком с архитектурой Erlang/OTP, известно, что проектирование сетевых приложений начинается с дерева супервижина легковесных процессов и протоколов которые определяют их взаимодействие. Подчиненные дочерние процессы обычно разделяют токены времени жизни CancellationToken, благодаря чему исключения возникшие в родительских процессах могут отменить все дерево подпроцессов. Поэтому в циклах процессов присутствует выражение

while not ct.IsCancellationRequested do

Наш вебсокет-сервер состоит из 7 асинхронных процессов:

[Sup] [L]* / / [start]--[S]--[C]* \ \ [H] [T]*

Легенда этого дерева такова: [start] нода представляет собой точку входа, из которой будут рождаться остальные асинхронные процессы, соотвествует функции start; [S] нода соовтествует асинхронному процессу, который представлен функцией listen; [Sup] нода соотвествует функции startSupervisor; [C] нода соотвествует функции startClient; [H] нода соотвествует функции heartbeat; [L] нода соотвествует функции loop; [T] нода соотвествует функции telemetry. Звездочкой будем обозначать процессы, количество которых зависит от количества активных соединений: [C]*, [L]*, [T]*.

Протоколы взаимодействия

В момент рождения клиента [C] в родительском процессе сервера [S] происходит нотификация [S]->[Sup] по так называемому протоколу супервижина Sup с одноименным типом. Публичный протокол публичной функции Stream.protocol представлен типом Msg, который предназначен для управления асинхронным процессом [L].

Система серверных пингов реализована совместимой с протоколами Sup и Msg, хартбит процесс [H] через интервал времени посылает Tick сообщение в супервизор [Sup], который в свою очередь шлет броадкаст для всех клиентов телеметрии [T] созданных на той же очереди, что и [C], т.е. тот же протокол.

Процесы [T], [L] и [C] разделяют WebSocket стрим и все связаны с супервизором сервера, нотифицируя его в случае возникновения исключений.

type Msg = | Bin of byte array | Text of string | Nope type Sup = | Connect of MailboxProcessor<Msg> * WebSocket | Disconnect of MailboxProcessor<Msg> | Close of WebSocket | Tick

RFC 6455 Хендшейк

Функции обработки HTTP хедеров. isWebSocketsUpgrade ищет в хедерах пару Upgrade и WebSocket. getLines возвращает хедеры как массив строк, а функция getKey возвращает значение хедера по его ключу.

let isWebSocketsUpgrade (lines: string array) = Array.exists (fun (x:string) -> "upgrade: websocket" = x.ToLower()) lines let getKey (key: string) arr = try let f (s: String) = s.StartsWith(key) (Array.find f arr).[key.Length + 1..] with _ -> "" let getLines (bytes: byte array) len = if len > 8 then bytes.[..(len - 9)] |> UTF8Encoding.UTF8.GetString |> fun hs -> hs.Split([| "\r\n" |], StringSplitOptions.RemoveEmptyEntries) else [||]

Функция RFC 6455 ответа называется handshake. Этой функциональности насколько мне известно нет в системных неймспейсах.

let acceptString6455 acceptCode = "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + acceptCode + "\r\n\r\n" let handshake lines = (getKey "Sec-WebSocket-Key:" lines) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" |> Encoding.ASCII.GetBytes |> SHA1CryptoServiceProvider.Create().ComputeHash |> Convert.ToBase64String |> acceptString6455 |> Encoding.ASCII.GetBytes

Асинхронные процессы сервера

Первый процесс [start], представляет собой точку входа, где стартует сразу три процесса: процесс супервизора всех соединений [Sup], процесс сервера слушателя соединений [S] и, если включен флаг Server.ticker, процесс сердцебиений, который работает как интервальный циклический таймер [H]. Эпилог процесса [start] содержит кенселяцию токена глобального для всех подпроцессов при освобождении переменной, которая соджержит вебсокет-сервер во внешнем коде.

let start (addr: string) (port: int) = let cts = new CancellationTokenSource() let token = cts.Token let sup = startSupervisor token let listener = TcpListener(IPAddress.Parse(addr), port) try listener.Start(10) with | :? SocketException -> failwithf "%s:%i is using by another program" addr port | err -> failwithf "%s" err.Message Async.StartImmediate(listen listener token sup, token) if ticker then Async.StartImmediate(heartbeat interval token sup, token) { new IDisposable with member x.Dispose() = cts.Cancel() }

Второй процесс [Sup], супервизор является чистой функцией, которая обрабатывет сообщения супервижин протокола о регистрации и смерти новых соединений. Тут же происходит бродкаст сообщений как реакция на сердцебиение тикера.

let startSupervisor (ct: CancellationToken) = MailboxProcessor.Start( (fun (inbox: MailboxProcessor<Sup>) -> let listeners = ResizeArray<_>() async { while not ct.IsCancellationRequested do match! inbox.Receive() with | Close ws -> () | Connect (l, ns) -> listeners.Add(l) | Disconnect l -> listeners.Remove(l) |> ignore | Tick -> listeners.ForEach(fun l -> l.Post Nope) }), cancellationToken = ct )

Интервальный таймер сердцебиения [H].

let heartbeat (interval: int) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { while not ct.IsCancellationRequested do do! Async.Sleep interval sup.Post(Tick) }

Главный цикл процесса [S], который принимает новые TCP соединения и стартует новых клиентов [C].

let listen (listener: TcpListener) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { while not ct.IsCancellationRequested do let! client = listener.AcceptTcpClientAsync() |> Async.AwaitTask client.NoDelay <- true startClient client sup ct |> ignore }

Асихронный процесс [C] с очередью (MailboxProcessor) обработки TCP соединений или, проще говоря, TCP клиент. Это точка входа для клиентского соединения, именно здесь происходит хендшейк. В случае успешного хендшейка мы шлем RFC 6455 ответ и запускаем сразу два асинхронных процесса: первый это сам цикл обработки вебсокет сообщений [L], а также, если установлен флаг Server.ticker мы запускаем процесс телеметрии [T], который разделяет WebSocket стрим и может осуществлять туда асинхронный сброс сообщений, конкурируя с основным циклом [L]. Такие процессы существуют всегда в паре.

let startClient (tcp: TcpClient) (sup: MailboxProcessor<Sup>) (ct: CancellationToken) = MailboxProcessor.Start( (fun (inbox: MailboxProcessor<Msg>) -> async { let ns = tcp.GetStream() let size = tcp.ReceiveBufferSize let bytes = Array.create size (byte 0) let! len = ns.ReadAsync(bytes, 0, bytes.Length) |> Async.AwaitTask let lines = getLines bytes len match isWebSocketsUpgrade lines with | true -> do! ns.AsyncWrite (handshake lines) let ws = WebSocket.CreateFromStream( (ns :> Stream), true, "n2o", TimeSpan(1, 0, 0)) sup.Post(Connect(inbox, ws)) if ticker then Async.Start(telemetry ws inbox ct sup, ct) return! looper ws size ct sup | _ -> tcp.Close() }), cancellationToken = ct )

Процесс телеметрии [T] слушает очередь, и на любое сообщение, шлет в вебсокет канал текстовое сообщение "TICK".

let telemetry (ws: WebSocket) (inbox: MailboxProcessor<Msg>) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { try while not ct.IsCancellationRequested do let! _ = inbox.Receive() do! send ws ct (Text "TICK") finally sup.Post(Disconnect <| inbox) ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "TELEMETRY", ct) |> ignore }

Главный цикл обработки сообщений [L], в котором происходит создание буферизированого WebSocket стрима, тип которого явно присутствует в протоколе супервижина. Также здесь выделяется глобальный для всего цикла буфер, куда копируются байты их сокета с помощью ReceiveAsync. При возникновении исключения происходит нотификация супервизора с помощью сообщения Close, которое сигнализирует о разрыве соединения, например в случае ошибки валидации UTF-8.

let looper (ws: WebSocket) (bufferSize: int) (ct: CancellationToken) (sup: MailboxProcessor<Sup>) = async { try let mutable bytes = Array.create bufferSize (byte 0) while not ct.IsCancellationRequested do let! result = ws.ReceiveAsync(ArraySegment<byte>(bytes), ct) |> Async.AwaitTask let recv = bytes.[0..result.Count - 1] match (result.MessageType) with | WebSocketMessageType.Text -> do! protocol (Text (Encoding.UTF8.GetString recv)) |> send ws ct | WebSocketMessageType.Binary -> do! protocol (Bin recv) |> send ws ct | WebSocketMessageType.Close -> () | _ -> printfn "PROTOCOL VIOLATION" finally sup.Post(Close <| ws) ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "LOOPER", ct) |> ignore }

Функции терминации канала наследуюет архаическое на мой взгляд разделение текстовых и бинарных сообщений. Как показывает практика трактовка всего как бинарных сообщений только улучшает семантику протокола.

let sendBytes (ws: WebSocket) ct bytes = ws.SendAsync(ArraySegment<byte>(bytes), WebSocketMessageType.Binary, true, ct) |> ignore let send ws ct (msg: Msg) = async { match msg with | Text text -> sendBytes ws ct (Encoding.UTF8.GetBytes text) | Bin arr -> sendBytes ws ct arr | Nope -> () }

Что дальше?

Дальше идут три фазы:

1) Контекст пира: порт, айпишник, хедеры, эндпойнт, служебная информация;
2) BERT сериализация для совместимости с клиентской инфраструктурой N2O;
3) Имплементация NITRO протокола.

Благодарности

Хочется поблагодарить всех, кто ставил лайки нашему проекту, а осоебнно Филлипа Картера, програмного менеджера .NET и F#  ❤  Мы чрезвычайно воодушевлены!

Авторы

Максим Сохацкий, Игорь Городецкий, Siegmentation Fault